package org.example.stream;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * 发布-处理-订阅模型
 * Process筛选合格数据推送给订阅者
 * SubmissionPublisher：发布者
 * Processor：数据处理器
 */
public class SomeProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {

    // 发布对象
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(8);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("======处理器正在处理消息数据：" + item);
        if (item < 50) {
            // 提交给订阅者
            this.submit("消息已处理：" + item);
        }
        this.subscription.request(10);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println("处理完毕");
        this.close();
    }
}
